#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/un.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "../../ynet/sock/sock_tcp.h"
#include "rpc_table.h"
#include "rpc_proto.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "core.h"
#include "corenet.h"
#include "lich_md.h"
#include "sock_unix.h"
#include "squeue.h"
#include "cache.h"
#include "prof.h"
#include "ylog.h"
#include "dbg.h"

#define MSG_SIZE 64

#define __prof_dio_port__ 27905

#if 1
#define USE_UNIX_DOMAIN
#endif

typedef struct {
        sem_t sem;
        uint64_t io;
        int threads;
        int sendsize;
        int recvsize;
} prof_net_arg_t;

typedef struct  {
        uint32_t magic;
        msgid_t msgid;
        char buf[0];
} msg_head_t;

typedef struct {
        sem_t sem;
        task_t task;
        int retval;
        buffer_t buf;
} rpc_ctx_t;

static int __listen_sd__;
static int __client_sd__;
static int __running__ = 0;

struct sockaddr_un __listen_addr__;
struct sockaddr_un __conn_addr__;

#define __LISTEN_PATH__ "/tmp/prof_dio.socket"

static void __prof_dio_reply(const sockid_t *sockid, const msg_head_t *req)
{
        msg_head_t rep;
        buffer_t buf;

        DBUG("reply %u\n", req->msgid.idx);

        rep.msgid = req->msgid;
        mbuffer_init(&buf, 0);
        mbuffer_appendmem(&buf, &rep, sizeof(rep));
        corenet_tcp_send(sockid, &buf, 0);
        mbuffer_init(&buf, 0);
        mbuffer_appendzero(&buf, PAGE_SIZE);
        corenet_tcp_send(sockid, &buf, 0);
}

inline static void __rpc_request_post_sem(void *arg1, void *arg2, void *arg3)
{
        rpc_ctx_t *ctx = arg1;
        int retval = *(int *)arg2;
        buffer_t *buf = arg3;

        (void) buf;
        (void) retval;
#if 0
        ctx->retval = retval;
        if (buf) {
                mbuffer_merge(&ctx->buf, buf);
        } else {
                ctx->buf.len = 0;
        }
#endif

        sem_post(&ctx->sem);
}

static int __prof_rpc_request(int sd, msg_head_t *head, int threads)
{
        int ret, count;
        char buf[BIG_BUF_LEN];
        struct pollfd pfd;

        ret = send(sd, head, sizeof(*head) * threads, 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        pfd.fd = sd;
        pfd.events = POLLIN;
        pfd.revents = 0;

        (void) pfd;

        count = 0;
        while (1) {
#if 1
                ret = poll(&pfd, 1, 0);
                if (ret == -1) {
                        ret = errno;
                        UNIMPLEMENTED(__DUMP__);
                }

                if (pfd.revents == 0)
                        continue;
#endif

                ret = recv(sd, buf, BIG_BUF_LEN, 0);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                YASSERT(ret);
                count += ret;

                YASSERT(count / ((int)sizeof(*head) + PAGE_SIZE) <= threads);
                if (count / ((int)sizeof(*head) + PAGE_SIZE) == threads)
                        break;
        }

        return 0;
err_ret:
        return ret;
}

static void __prof_dio_exec(void *arg)
{
        rpc_request_t *rpc_request = arg;
        msg_head_t req;

        mbuffer_popmsg(&rpc_request->buf, &req, sizeof(req));

        __prof_dio_reply(&rpc_request->sockid, &req);

        DBUG("exec\n");

        mbuffer_free(&rpc_request->buf);
        mem_cache_free(MEM_CACHE_128, rpc_request);
}

typedef struct {
        sockid_t sockid;
} dio_session_t;

static int __prof_dio_newtask(void *ctx, void *buf, int *_count)
{
        int count = 0;
        msg_head_t head;
        rpc_request_t *rpc_request;
        dio_session_t *se = ctx;
        buffer_t *_buf = buf;

        DBUG("new task, sd %u buflen %u\n", se->sockid.sd, _buf->len);
        while (_buf->len >= sizeof(head)) {
                mbuffer_get(_buf, &head, sizeof(head));

                //DBUG("new task, len %u\n", head.len + head.blocks);
                rpc_request = mem_cache_calloc(MEM_CACHE_128, 0);
                mbuffer_init(&rpc_request->buf, 0);
                mbuffer_pop(_buf, &rpc_request->buf, sizeof(head));
                rpc_request->sockid = se->sockid;

                schedule_task_new("prof_task", __prof_dio_exec, rpc_request, -1);

                count++;
        }

        DBUG("new task %u\n", count);

        *_count = count;

        return 0;
}

static void __prof_dio_close(void *arg)
{
        yfree((void **)&arg);
}

static int __prof_dio_accept__()
{
        int ret, sd, addr;
        socklen_t alen;
        dio_session_t *se;

#ifdef USE_UNIX_DOMAIN
        struct sockaddr_un sin;
        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_un);

        sd = accept(__listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = sock_unix_tuning(sd);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        addr = 123;
#else
        struct sockaddr_in sin;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_in);

        sd = accept(__listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = tcp_sock_tuning(sd, 1, 1);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        addr = sin.sin_addr.s_addr;
#endif

        DINFO("accept new sd %u\n", sd);

        ret = ymalloc((void **)&se, sizeof(*se));
        if (unlikely(ret))
                GOTO(err_sd, ret);

        se->sockid.sd = sd;
        se->sockid.seq = _random();
        se->sockid.type = SOCKID_CORENET;
        se->sockid.addr = addr;

        ret = core_attach(0, &se->sockid, "dio_srv", se,
                          __prof_dio_newtask, __prof_dio_close, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_sd:
        (void) close(sd);
err_ret:
        return ret;
}

static void *__prof_dio_accept(void *_arg)
{
        int ret;

        (void) _arg;

        while (1) {
                ret = sock_poll_sd(__listen_sd__, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                ret = __prof_dio_accept__();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        return NULL;
}

int prof_dio_init()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

#ifdef USE_UNIX_DOMAIN
        ret = sock_unix_listen(__LISTEN_PATH__, &__listen_sd__, &__listen_addr__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("listen @ %s, sd %u\n", __LISTEN_PATH__, __listen_sd__);
#else
        char port[MAX_NAME_LEN];

        snprintf(port, MAX_NAME_LEN, "%d", __prof_dio_port__);
        ret = tcp_sock_hostlisten(&__listen_sd__, NULL, port,
                                  YNET_QLEN, YNET_RPC_BLOCK, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __prof_dio_accept, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void *__prof_dio_cli_worker(void *_arg)
{
        int ret;
        prof_net_arg_t *arg = _arg;
        msg_head_t *head;

#if 0
        ret = rpc_table_private_init();
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
#endif

        ret = ymalloc((void **)&head, sizeof(*head) * arg->threads);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        while (__running__) {
                ret = __prof_rpc_request(__client_sd__, head, arg->threads);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                arg->io += arg->threads;
        }

        return NULL;
}

static void __prof_dio_cli_print(prof_net_arg_t *args, int threads, struct timeval *begin)
{
        int i;
        uint64_t io, used;
        struct timeval now;

        io = 0;
        for (i = 0; i < threads; i++) {
                io += args[i].io;
        }

        _gettimeofday(&now, NULL);

        used = _time_used(begin, &now);

        DBUG("io %u, used %u\n", (int)io, (int)used);
        printf("iops : %llu\n", (long long)io * 1000 * 1000 /  used);
}

int prof_dio(const char *srv, int threads, int runtime)
{
        int ret, i, sd;
        net_handle_t nh;
        struct timeval last, begin;
        prof_net_arg_t args[1], *arg;
        pthread_t th;
        pthread_attr_t ta;
        char port[MAX_NAME_LEN];

        dbg_info(0);

        ret = env_init_simple("lichbd");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        analysis_init();

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

#ifdef USE_UNIX_DOMAIN
        (void) port;
        (void) srv;

        ret =  sock_unix_connect(__LISTEN_PATH__, &sd, &__conn_addr__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        nh.u.sd.sd = sd;
        nh.u.sd.addr = 0;

        ret = sock_unix_tuning(sd);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = sock_setblock(sd);
        if (unlikely(ret)) {
                DERROR("%d - %s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }
#endif
#else        
        snprintf(port, MAX_NAME_LEN, "%d", __prof_dio_port__);

        ret = tcp_sock_hostconnect(&nh, srv, port, 0, 10, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = tcp_sock_tuning(__client_sd__, 1, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        __client_sd__ = nh.u.sd.sd;
        __running__ = 1;

        arg = &args[0];
        arg->io = 0;
        arg->threads = threads;

        ret = pthread_create(&th, &ta, __prof_dio_cli_worker, arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //end = gettime();

        i = 0;
        _gettimeofday(&begin, NULL);
        last = begin;
        while (i < runtime) {
                sleep(1);
                i++;
                __prof_dio_cli_print(args, 1, &last);
        }

        __running__ = 0;

#if 0
        analysis_dumpall();

        sleep(100000);
#endif

        return 0;
err_ret:
        return ret;
}
